import subprocess
import sys
# 必要なライブラリをリストにまとめる
= [
required_libraries 'json',
'pandas',
'matplotlib',
'pytz',
'ipywidgets',
'numpy',
'seaborn',
'ipyfilechooser',
'plotly'
]
# ライブラリのインストール関数
def install_and_import(library):
try:
__import__(library)
except ImportError:
"-m", "pip", "install", library])
subprocess.check_call([sys.executable, __import__(library)
# 各ライブラリのインストールとインポート
for library in required_libraries:
install_and_import(library)
# インポート文
import json
import pandas as pd
import os
import matplotlib.pyplot as plt
'font.family'] = 'Yu Mincho', #'Hiragino Kaku Gothic ProN', #'Meiryo', #'Noto Sans CJK JP'
plt.rcParams[import matplotlib.dates as mdates
import pytz
import ipywidgets as widgets
import numpy as np
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio
from ipywidgets import DatePicker, Button, HBox
from matplotlib.lines import Line2D
from pytz import timezone
from IPython.display import display, clear_output
from ipyfilechooser import FileChooser
from datetime import datetime
from datetime import timedelta
print("All libraries are installed and imported successfully.")
Sleep Analyze Jupyter Notebook
このスクリプトはGitHubからダウンロードできます。
はじめに
- このノートブックでは、Google Fitのデータを元に睡眠データを分析します
- Google Fitからデータをダウンロードしてください
- Google Fitからデータをダウンロードする方法が分からない方は以下のドキュメント(Google Fitからデータをダウンロードする方法)をご覧ください。
- このノートブックは、筆者のような複数のデバイスを使って睡眠記録をつけている方でも問題なく分析が行えます
- セルは一括で実行せず、1つずつ実行してください
データ分析の流れ
モジュールのインポート
データの取り込み
(手動の睡眠記録がある場合)データセットの選択
(必要であれば)データをCSVファイルとしてエクスポート
アクトグラムを用いた四半期ごとの睡眠記録の可視化
四半期ごとの統計
任意の日付の睡眠記録の可視化
In [2]:
睡眠データの選択
GoogleからダウンロードしたZipファイルを解凍します。
takeout-YYYYMMDDxxxxxx.zip
のような名前のZipファイルです
Takeout
>Fit
>すべてのデータ
の中にraw_com.google.sleep.segment
という文字列を含むJSONファイルがあるはずなので、それらをすべて任意のフォルダーにコピーします。raw_com.google.sleep.segment
をフォルダ内検索すると便利です
任意のフォルダーにある、全ての睡眠データ(
raw_com.google.sleep.segment
)を以下のセルで読み込みます。- あるいは直接、解凍したtakeoutファイルから、
Takeout
>Fit
>すべてのデータ
と進み、raw_com.google.sleep.segment
という文字列を含むJSONファイルを選択します(フォルダ内検索を推奨します)
- あるいは直接、解凍したtakeoutファイルから、
In [4]:
# ファイルアップロードウィジェットの作成
= widgets.FileUpload(
uploader ='.json', # JSONファイルのみを許可
accept=True, # 複数のファイルをアップロード可能
multiple='Upload JSON files'
description
)
= False
file_loading_flag
# アップロードされたデータを処理する関数
def process_uploaded_files(change):
global file_loading_flag
# 処理中メッセージを表示
with output:
clear_output()print("ファイルの処理中です。次のセルには進まないでください。")
# 出力を即座にフラッシュ
sys.stdout.flush()
try:
for file_info in change['new']:
print(f"Processing {file_info['name']}")
# 出力を即座にフラッシュ
sys.stdout.flush() = file_info['content']
content = json.loads(content.tobytes().decode('utf-8'))
import_data = load_and_process_sleep_data(import_data, 'Type of Sleep')
df print("ファイルの読み込みが完了しました。次のセルに進んでください")
= True
file_loading_flag
with output:
if file_loading_flag is True:
clear_output()print("ファイルの読み込みが完了しました。次のセルに進んでください。")
# 出力を即座にフラッシュ
sys.stdout.flush()
except Exception as e:
with output:
clear_output()print(f"エラーが発生しました: {e}")
# 出力を即座にフラッシュ
sys.stdout.flush()
# JSONデータをDataFrameに変換するための関数
def load_and_process_sleep_data(import_data, type_value):
= import_data['Data Source']
data_source = import_data['Data Points']
data_points = pd.DataFrame([{
df 'data_source': data_source,
'start_time_ns': dp['startTimeNanos'],
'end_time_ns': dp['endTimeNanos'],
'sleep_state': dp['fitValue'][0]['value']['intVal'],
'modified_time_ms': dp['modifiedTimeMillis'],
'Type': type_value
for dp in data_points])
} 'start_time'] = pd.to_datetime(df['start_time_ns'], unit='ns')
df['end_time'] = pd.to_datetime(df['end_time_ns'], unit='ns')
df[return df
# 出力ウィジェットの作成
= widgets.Output()
output
# 初期メッセージの表示
with output:
print("ファイルの処理が完了するまで、次のセルには進まないでください")
# アップロードイベントに関数をバインド
='value')
uploader.observe(process_uploaded_files, names
# ウィジェットの表示
display(uploader)
display(output)
In [5]:
def parse_datetime_with_format(dt_series):
= dt_series[dt_series.astype(str).str.contains(r"\.\d+")]
dt_series_with_ms = dt_series[~dt_series.astype(str).str.contains(r"\.\d+")]
dt_series_without_ms = pd.to_datetime(dt_series_with_ms, format='%Y-%m-%d %H:%M:%S.%f', errors='coerce')
parsed_with_ms = pd.to_datetime(dt_series_without_ms, format='%Y-%m-%d %H:%M:%S', errors='coerce')
parsed_without_ms return pd.concat([parsed_with_ms, parsed_without_ms]).sort_index()
# アップロードされたファイル名とデータの取得
= uploader.value uploaded_files
手動睡眠記録の選択
- 睡眠記録には、自動で睡眠を検知するタイプ(主にスマートウォッチや睡眠マットなど)のものと、スマートフォンアプリなどを用いて手動で睡眠記録の開始・停止をするものの2種類があります
- (このノートブックは、両者に対応しているだけでなく、複数のデバイスを用いて睡眠記録をつけている場合でも問題なく分析を行うことができます)
- もし、手動で睡眠記録を開始・停止している場合、以下のセルでその睡眠データにチェックを入れてください
- (よく分からない場合は特に何もせず、「決定(データを処理)」ボタンを押してください)
In [7]:
# 説明文を表示
= widgets.Label('もし手動で睡眠記録を開始/停止したデータセットがあれば該当するものにチェックを入れてください。')
description_label
display(description_label)
# 空のデータフレームを初期化
= pd.DataFrame()
combined_data
# ファイルと対応するチェックボックスを表示
= []
checkboxes for file_details in uploader.value:
= widgets.Checkbox(
cb =False,
value='',
description=False
disabled
)= widgets.Label(file_details['name'])
label = widgets.HBox([cb, label])
box
checkboxes.append(cb)
display(box)
# プロセスボタンを作成
= widgets.Button(description="決定(データを処理)")
process_button
# ボタンのイベントハンドラー
def on_button_clicked(b):
=True)
clear_output(waitglobal combined_data
for cb, file_details in zip(checkboxes, uploader.value):
= file_details['name']
filename = file_details['content']
content = json.loads(content.tobytes().decode('utf-8'))
sleep_data
# チェックボックスの値に応じてデータタイプを設定
= 'Manual' if cb.value else 'Auto'
type_column_value = load_and_process_sleep_data(sleep_data, type_column_value)
data = pd.concat([combined_data, data], ignore_index=True)
combined_data
# データ処理後の状態を表示
print("Data processing complete. Dataframe contains:", combined_data.shape[0], "rows.")
process_button.on_click(on_button_clicked) display(process_button)
In [8]:
# 日時データの解析
'start_time'] = parse_datetime_with_format(combined_data['start_time'])
combined_data['end_time'] = parse_datetime_with_format(combined_data['end_time'])
combined_data[
# データを時系列順にソート
= combined_data.sort_values(by='start_time')
combined_data
# 各睡眠データポイントの次の開始時刻を計算
'next_start_time'] = combined_data['start_time'].shift(-1)
combined_data[
# ギャップを計算(分単位)
'gap'] = (combined_data['next_start_time'] - combined_data['end_time']).dt.total_seconds() / 60
combined_data[
# 新しいセッションの開始を示すフラグを設定(ギャップが120分以上の場合)
'new_session_flag'] = (combined_data['gap'] > 119).astype(int)
combined_data[
# session_idを累積和で割り当て
'session_id'] = combined_data['new_session_flag'].shift(1).fillna(0).cumsum().astype(int)
combined_data[
# 'Type'が'Manual'のデータを抽出して、各セッションの最初のstart_timeをin_bed_timeとして定義
= combined_data[combined_data['Type'] == 'Manual']
manual_sleep_data = manual_sleep_data.groupby('session_id').first().reset_index()
in_bed_times = in_bed_times[['session_id', 'start_time']]
in_bed_times ={'start_time': 'in_bed_time'}, inplace=True)
in_bed_times.rename(columns
# 全データにin_bed_timeをマージ
= combined_data.merge(in_bed_times, on='session_id', how='left')
combined_data
# Extract relevant columns: 'in_bed_time', 'expanded_start_time', 'expanded_end_time', 'majority_sleep_state'
= combined_data[['data_source', 'Type', 'in_bed_time', 'start_time', 'end_time', 'sleep_state', 'session_id']]
selected_columns_data
# DataFrameを直接次のステップで使用
= selected_columns_data # これが分析や可視化に使われるデータフレーム
sleep_data
# 各セッションの最初のstart_timeと最後のend_timeを取得
= combined_data.groupby('session_id').agg({'start_time': 'first', 'end_time': 'last'}).reset_index()
session_start_end
# ミッドスリープタイムを計算
'mid_sleep_time'] = session_start_end['start_time'] + (session_start_end['end_time'] - session_start_end['start_time']) / 2
session_start_end[
# ミッドスリープタイムの期間を1分に設定
= timedelta(minutes=1)
one_minute 'mid_sleep_start'] = session_start_end['mid_sleep_time'] - one_minute / 2
session_start_end['mid_sleep_end'] = session_start_end['mid_sleep_time'] + one_minute / 2
session_start_end[
# ミッドスリープタイムのデータフレームを作成
= pd.DataFrame({
mid_sleep_data 'data_source': 'Mid_sleep_time',
'Type': 'Other',
'in_bed_time': pd.NaT,
'start_time': session_start_end['mid_sleep_start'],
'end_time': session_start_end['mid_sleep_end'],
'sleep_state': 10,
'session_id': session_start_end['session_id']
})
# sleep_dataにミッドスリープタイムのデータを追加
= pd.concat([sleep_data, mid_sleep_data], ignore_index=True) sleep_data
睡眠データをCSVファイルとして保存する(任意項目: 保存しなくても問題ありません)
- 以下のセルを実行することで、睡眠データをCSVファイルとして保存することが可能です
- 保存しなくとも分析に問題はありません
- CSVファイルは以下の構成となっています
data_source
: インポートしたデータセットの名前です(基本的にはGoogle Fitから取得されたデバイスの名前です)Mid_sleep_time
: 算出されたミッドスリープタイム
Type
: 基本的にはAuto
かManual
のどちらかが入りますAuto
: 自動で睡眠記録が開始・停止していることを意味しますManual
: 手動で睡眠記録が開始・停止していることを意味しますOther
: 算出されたミッドスリープタイムであることを意味します
in_bed_time
: 手動で睡眠記録を開始している場合、その開始時刻が入りますstart_time
: 各睡眠ステージが開始された時刻で、協定世界時(UTC)となっています(日本標準時ではないことに注意してください)end_time
: 各睡眠ステージが終了した時刻で、協定世界時(UTC)となっています(日本標準時ではないことに注意してください)sleep_state
: Google Fitで定められた睡眠ステージの値(1~6)と、算出したミッドスリープタイム(10)が入りますsession_id
: 1回の睡眠ごとに割り振られたIDです(end_time
から次のstart_time
までの間が2時間以上離れている場合、別の睡眠とみなしています)
睡眠ステージについて
睡眠ステージのタイプ | 値 |
---|---|
覚醒(睡眠サイクル中) | 1 |
睡眠 | 2 |
ベッド外 | 3 |
浅い睡眠 | 4 |
深い睡眠 | 5 |
レム睡眠 | 6 |
ミッドスリープタイム | 10 |
(おそらくですが)睡眠ステージ2はデータの信頼性が低く使われていない傾向にあります ミッドスリープタイムはオリジナル(Google Fitのデータ)にはない項目です
In [10]:
# CSVを保存する関数
# ユーザーのデスクトップパスを取得
= os.path.join(os.environ['USERPROFILE'], 'Desktop')
desktop_path
# CSVを保存する関数
def save_csv(sleep_data, path):
=False)
sleep_data.to_csv(path, indexreturn f'CSVファイルを {path}に保存しました。'
# 「Save」ボタンの動作を定義
def on_save_button_clicked(b):
with output:
clear_output()if not fc.selected:
print("CSVファイルの保存先を選択してください")
else:
# ここでDataFrameを保存
= save_csv(sleep_data, fc.selected) # dfは保存したいDataFrameの変数名
result print(result)
# ファイル選択ダイアログを設定
= FileChooser(desktop_path)
fc = 'sleep_data.csv'
fc.default_filename = True
fc.use_dir_icons
# 「Save」ボタンの作成
= widgets.Button(
save_button ='Save',
description='',
button_style='Click to save the CSV file',
tooltip='check'
icon
)
save_button.on_click(on_save_button_clicked)
# 出力エリアを設定
= widgets.Output()
output
# ウィジェットを表示
display(fc, save_button, output)
アクトグラムを表示するための準備を行うセル
アクトグラムとは、睡眠周期を可視化するグラフのことです。
In [12]:
# 日またぎを処理する関数
def adjust_end_time(start, end):
if end < start:
+= 1440 # 翌日にまたがる場合は24時間分(分)を加算
end return end
def convert_to_jst_if_needed(column):
# タイムゾーン情報を確認し、必要に応じて変換を行う
if column.dt.tz is None:
# タイムゾーン情報がない場合、UTCとして解釈し、JSTに変換
return pd.to_datetime(column, utc=True).dt.tz_convert('Asia/Tokyo')
elif str(column.dt.tz) == 'UTC':
# タイムゾーンがUTCであれば、JSTに変換
return column.dt.tz_convert('Asia/Tokyo')
elif str(column.dt.tz) != 'Asia/Tokyo':
# タイムゾーンがJSTでない他のタイムゾーンであれば、JSTに変換
return column.dt.tz_convert('Asia/Tokyo')
else:
# 既にJSTであればそのまま返す
return column
def convert_sleep_data_to_jst(sleep_data):
# sleep_dataをコピーしてタイムゾーンを変換
= sleep_data.copy()
jst_sleep_data 'start_time'] = convert_to_jst_if_needed(jst_sleep_data['start_time'])
jst_sleep_data['end_time'] = convert_to_jst_if_needed(jst_sleep_data['end_time'])
jst_sleep_data[
# 明示的に datetime64[ns, Asia/Tokyo] にキャスト
'start_time'] = jst_sleep_data['start_time'].astype('datetime64[ns, Asia/Tokyo]')
jst_sleep_data['end_time'] = jst_sleep_data['end_time'].astype('datetime64[ns, Asia/Tokyo]')
jst_sleep_data[
return jst_sleep_data
def plot_actogram(sleep_data, start_date, end_date):
# タイムゾーン変換後のデータを取得
= convert_sleep_data_to_jst(sleep_data)
jst_sleep_data
# 指定された期間でデータをフィルタリング
= jst_sleep_data[
filtered_data 'start_time'] >= pd.Timestamp(start_date).tz_localize('Asia/Tokyo')) &
(jst_sleep_data['end_time'] <= pd.Timestamp(end_date).tz_localize('Asia/Tokyo'))
(jst_sleep_data[
].copy()
if filtered_data.empty:
print(f"No data available to plot between {start_date} and {end_date}.")
return
# 日またぎを考慮した時間の計算
'start_minutes'] = filtered_data['start_time'].apply(lambda dt: dt.hour * 60 + dt.minute)
filtered_data['end_minutes'] = filtered_data.apply(
filtered_data[lambda row: adjust_end_time(row['start_minutes'], row['start_minutes'] + (row['end_time'] - row['start_time']).seconds // 60), axis=1)
= {1: '#e0ffff', 2: '#b3e5fc', 3: '#ff5252', 4: '#03a9f4', 5: '#303f9f', 6: '#ab47bc', 10: 'yellow'}
color_map 'color'] = filtered_data['sleep_state'].map(color_map)
filtered_data[
= max(1, (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 1)
num_days = plt.subplots(figsize=(20, num_days * 0.4))
fig, ax for _, row in filtered_data.iterrows():
= (row['start_time'] - pd.Timestamp(start_date).tz_localize('Asia/Tokyo')).days
day_of_week 'start_minutes'], row['end_minutes']], [day_of_week, day_of_week], color=row['color'], alpha=0.7)
ax.plot([row['start_minutes'] + 1440, row['end_minutes'] + 1440], [day_of_week + 1, day_of_week + 1], color=row['color'], alpha=0.7)
ax.plot([row[
0, 2880)
ax.set_xlim(0, num_days)
ax.set_ylim(range(num_days))
ax.set_yticks('Asia/Tokyo') + pd.Timedelta(days=x)).strftime('%Y-%m-%d') for x in range(num_days)])
ax.set_yticklabels([(pd.Timestamp(start_date).tz_localize('Time')
ax.set_xlabel('Days from Start Date')
ax.set_ylabel(f'Actogram from {start_date} to {end_date}')
plt.title(True)
plt.grid(=[i * 60 for i in range(49)], labels=[f'{(i % 24):02d}:00' if i % 2 == 0 else '' for i in range(49)], rotation=45)
plt.xticks(ticks
plt.show()
# データセットの範囲確認
= convert_sleep_data_to_jst(sleep_data)
jst_sleep_data = jst_sleep_data['start_time'].min().strftime('%Y-%m-%d')
start_date = jst_sleep_data['start_time'].max().strftime('%Y-%m-%d')
end_date print(f"This dataset contains data from {start_date} to {end_date}.")
↑ に表示されたのが、JSONファイルから読み込まれたデータ範囲です。
This dataset contains data from
YYYY-MM-DD
toYYYY-MM-DD
以下のセルでは、 - JSONファイルに含まれていたデータ期間に応じて、四半期(3ヶ月)ごとにアクトグラムがプロットされます - 1年につき4枚グラフが出るので、含まれているデータ期間が長い場合は、全部のグラフが出力されるまで時間がかかります - 任意の期間のアクトグラムを表示することも可能です
アクトグラムを表示せず、次のセルに進んでも構いません。
In [14]:
# 四半期毎にデータをプロット
= jst_sleep_data['start_time'].dt.year.min()
start_year = jst_sleep_data['start_time'].dt.year.max()
end_year = jst_sleep_data['start_time'].max()
last_date
for year in range(start_year, end_year + 1):
for quarter in range(1, 5):
= 3 * quarter - 2
start_month = 3 * quarter
end_month = pd.Timestamp(year=year, month=start_month, day=1).tz_localize('Asia/Tokyo')
quarter_start_date = pd.Timestamp(year=year, month=end_month, day=1).tz_localize('Asia/Tokyo') + pd.DateOffset(months=1) - pd.DateOffset(days=1)
quarter_end_date
if quarter_start_date > last_date:
break # この四半期の開始日がデータセットの最後の日を超えている場合はスキップ
if quarter_end_date > last_date:
= last_date # 四半期の終了日がデータセットの最後の日を超えている場合は調整
quarter_end_date
'%Y-%m-%d'), quarter_end_date.strftime('%Y-%m-%d')) plot_actogram(jst_sleep_data, quarter_start_date.strftime(
任意の期間に絞ったアクトグラム
- 任意の期間に絞ったアクトグラムを、ブラウザ上に表示するセルです
- データの期間は四半期(3ヶ月)を推奨しています
- 四半期より長い期間を選択した場合、Y軸の文字が潰れてしまいます
- このセルを実行しなくても問題はありません
In [16]:
# 任意の期間のアクトグラム
def plot_interactive_actogram(sleep_data, start_date, end_date):
# タイムゾーン変換後のデータを取得
= convert_sleep_data_to_jst(sleep_data)
jst_sleep_data
# 指定された期間でデータをフィルタリング
= jst_sleep_data[
filtered_data 'start_time'] >= pd.Timestamp(start_date).tz_localize('Asia/Tokyo')) &
(jst_sleep_data['end_time'] <= pd.Timestamp(end_date).tz_localize('Asia/Tokyo'))
(jst_sleep_data[
].copy()
if filtered_data.empty:
print(f"No data available to plot between {start_date} and {end_date}.")
return
# 日またぎを考慮した時間の計算
'start_minutes'] = filtered_data['start_time'].apply(lambda dt: dt.hour * 60 + dt.minute)
filtered_data['end_minutes'] = filtered_data.apply(
filtered_data[lambda row: adjust_end_time(row['start_minutes'], row['start_minutes'] + (row['end_time'] - row['start_time']).seconds // 60), axis=1)
= {
color_map 1: '#e0ffff', # 覚醒(睡眠サイクル中)
2: '#b3e5fc', # 睡眠
3: '#ff5252', # ベッド外
4: '#03a9f4', # 浅い睡眠
5: '#303f9f', # 深い睡眠
6: '#ab47bc', # レム睡眠
10: 'black' # ミッドスリープタイム(色を強調)
}= {
sleep_stage_labels 1: '覚醒(睡眠サイクル中)',
2: '睡眠',
3: 'ベッド外',
4: '浅い睡眠',
5: '深い睡眠',
6: 'レム睡眠',
10: 'ミッドスリープタイム'
}'color'] = filtered_data['sleep_state'].map(color_map)
filtered_data[
= max(1, (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 1)
num_days = go.Figure()
fig
for sleep_state, color in color_map.items():
= filtered_data[filtered_data['sleep_state'] == sleep_state]
sleep_state_data if not sleep_state_data.empty:
for _, row in sleep_state_data.iterrows():
= (row['start_time'] - pd.Timestamp(start_date).tz_localize('Asia/Tokyo')).days
day_of_week = 7.5 if sleep_state == 10 else 5 # ミッドスリープタイムの場合は線の太さを15に設定
line_width = 0 if sleep_state == 10 else 0 # ミッドスリープタイムの場合はy座標をさらにオフセット
y_offset = 1 if sleep_state == 10 else 0.5 # ミッドスリープタイム以外は透明度を0.3に設定
opacity
fig.add_trace(go.Scatter(=[row['start_minutes'], row['end_minutes']],
x=[day_of_week + y_offset, day_of_week + y_offset],
y='lines',
mode=dict(color=row['color'], width=line_width),
line=sleep_stage_labels[sleep_state],
name=f"{row['start_time'].strftime('%Y-%m-%d %H:%M')} to {row['end_time'].strftime('%Y-%m-%d %H:%M')}",
text='text',
hoverinfo=opacity
opacity
))
fig.add_trace(go.Scatter(=[row['start_minutes'] + 1440, row['end_minutes'] + 1440],
x=[day_of_week + 1 + y_offset, day_of_week + 1 + y_offset],
y='lines',
mode=dict(color=row['color'], width=line_width),
line=sleep_stage_labels[sleep_state],
name=f"{row['start_time'].strftime('%Y-%m-%d %H:%M')} to {row['end_time'].strftime('%Y-%m-%d %H:%M')}",
text='text',
hoverinfo=opacity
opacity
))
# 凡例を統合
= set()
unique_labels lambda trace: trace.update(showlegend=False) if trace.name in unique_labels else unique_labels.add(trace.name))
fig.for_each_trace(
fig.update_layout(=f'Interactive Actogram from {start_date} to {end_date}',
title='Time',
xaxis_title='Days from Start Date',
yaxis_title=dict(
xaxis='array',
tickmode=[i * 60 for i in range(49)],
tickvals=[f'{(i % 24):02d}:00' if i % 2 == 0 else '' for i in range(49)],
ticktextrange=[0, 2880]
),=dict(
yaxis=list(range(num_days)),
tickvals=[(pd.Timestamp(start_date).tz_localize('Asia/Tokyo') + pd.Timedelta(days=x)).strftime('%Y-%m-%d') for x in range(num_days)],
ticktextrange=[0, num_days],
=dict(size=10) # Y軸ラベルの文字サイズを小さく
tickfont
),='closest',
hovermode=dict(
legend='constant'
itemsizing
)
)
file='sleep_data_plot.html', auto_open=True)
pio.write_html(fig,
# ウィジェットの作成
= widgets.DatePicker(
start_date_picker ='Start Date',
description=False
disabled
)= widgets.DatePicker(
end_date_picker ='End Date',
description=False
disabled
)= widgets.Button(
interactive_button ='Plot Interactive Actogram',
description='info',
button_style='Click to plot the interactive actogram',
tooltip='line-chart'
icon
)= widgets.Label(
notice_label ='データ範囲は四半期(3ヶ月)程度にしてください。それ以上の期間を指定すると文字が潰れて読めなくなります。また、グラフの作成には少し時間がかかります。'
value
)
# ボタンがクリックされたときの動作
def on_button_clicked(b):
= start_date_picker.value
start_date = end_date_picker.value
end_date if start_date is not None and end_date is not None:
plot_interactive_actogram(sleep_data, start_date, end_date)else:
print("Please select both start and end dates.")
interactive_button.on_click(on_button_clicked)
# ウィジェットの表示
display(notice_label, start_date_picker, end_date_picker, interactive_button)
睡眠の統計分析を行うセル
以下のセルでは、 1. 四半期ごとの睡眠時間及び睡眠の質の推移の可視化(折れ線グラフ) - 1枚 2. 四半期の範囲で曜日別の睡眠時間及び睡眠の質の可視化(箱ひげ図) - 1年につき4枚 を行います。
含まれているデータ期間が長い場合は、全部のグラフが出力されるまで時間がかかります。 そのため、実行せず、次のセルに進んでも構いません。
In [18]:
def calculate_sleep_quality(sleep_data):
# 各セッションの睡眠時間を計算
= sleep_data.groupby('session_id').agg(
session_start_end =('start_time', 'min'),
start_time=('end_time', 'max')
end_time
).reset_index()'sleep_duration_total'] = (session_start_end['end_time'] - session_start_end['start_time']).dt.total_seconds() / 3600
session_start_end[
# 深い睡眠の割合を計算
'sleep_duration'] = (sleep_data['end_time'] - sleep_data['start_time']).dt.total_seconds() / 3600
sleep_data[= sleep_data[sleep_data['sleep_state'] == 5] # 深い睡眠
deep_sleep_data = deep_sleep_data.groupby('session_id')['sleep_duration'].sum().reset_index()
deep_sleep_duration
# 列名を変更
={'sleep_duration': 'sleep_duration_deep'}, inplace=True)
deep_sleep_duration.rename(columns
= pd.merge(session_start_end, deep_sleep_duration, on='session_id', how='left')
sleep_quality 'sleep_quality'] = sleep_quality['sleep_duration_deep'].fillna(0) / sleep_quality['sleep_duration_total']
sleep_quality[
return sleep_quality[['session_id', 'sleep_duration_total', 'sleep_quality']]
def calculate_quarterly_sleep_stats(sleep_data):
= calculate_sleep_quality(sleep_data)
sleep_quality = pd.merge(sleep_data, sleep_quality, on='session_id')
sleep_data
'quarter'] = sleep_data['start_time'].dt.to_period('Q')
sleep_data[= sleep_data.groupby('quarter').agg(
quarterly_stats =('sleep_duration_total', 'mean'),
avg_sleep_time=('sleep_quality', 'mean')
avg_sleep_quality
).reset_index()
return quarterly_stats
def plot_quarterly_sleep_stats(quarterly_stats):
= plt.subplots(figsize=(18, 6))
fig, ax1
'Quarter')
ax1.set_xlabel('Average Sleep Time (hours)', color='tab:blue')
ax1.set_ylabel('quarter'].astype(str), quarterly_stats['avg_sleep_time'], color='tab:blue', marker='o', label='Avg Sleep Time')
ax1.plot(quarterly_stats[='y', labelcolor='tab:blue')
ax1.tick_params(axis
= ax1.twinx()
ax2 'Average Sleep Quality', color='tab:orange')
ax2.set_ylabel('quarter'].astype(str), quarterly_stats['avg_sleep_quality'], color='tab:orange', marker='o', linestyle='--', label='Avg Sleep Quality')
ax2.plot(quarterly_stats[='y', labelcolor='tab:orange')
ax2.tick_params(axis
fig.tight_layout()='upper left', bbox_to_anchor=(0.1, 0.9))
fig.legend(loc'Quarterly Average Sleep Time and Quality')
plt.title(
plt.show()
def calculate_weekly_sleep_stats(sleep_data):
= calculate_sleep_quality(sleep_data)
sleep_quality = pd.merge(sleep_data, sleep_quality, on='session_id')
sleep_data
'quarter'] = sleep_data['start_time'].dt.to_period('Q')
sleep_data['weekday'] = sleep_data['start_time'].dt.day_name()
sleep_data[= sleep_data.groupby(['quarter', 'weekday']).agg(
weekly_stats =('sleep_duration_total', 'mean'),
avg_sleep_time=('sleep_quality', 'mean')
avg_sleep_quality
).reset_index()
return weekly_stats
def plot_weekly_sleep_stats_boxplot(sleep_data_jst):
= calculate_sleep_quality(sleep_data_jst)
sleep_quality = pd.merge(sleep_data_jst, sleep_quality, on='session_id')
sleep_data
'quarter'] = sleep_data['start_time'].dt.to_period('Q')
sleep_data['weekday'] = sleep_data['start_time'].dt.day_name()
sleep_data[= ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
weekdays = ['#3498db', '#3498db', '#3498db', '#3498db', '#3498db', '#e74c3c', '#e74c3c'] # 平日は青、土日は赤
weekday_colors = dict(zip(weekdays, weekday_colors))
weekday_palette
for quarter in sleep_data['quarter'].unique():
= sleep_data[sleep_data['quarter'] == quarter]
quarter_data
= plt.subplots(2, 1, figsize=(12, 12))
fig, (ax1, ax2)
# 睡眠時間の箱ひげ図
='weekday', y='sleep_duration_total', data=quarter_data, order=weekdays, palette=weekday_palette, ax=ax1, hue='weekday', dodge=False)
sns.boxplot(xf'Weekly Sleep Duration for {quarter}')
ax1.set_title('Weekday')
ax1.set_xlabel('Sleep Duration (hours)')
ax1.set_ylabel(=False) # レジェンドを非表示にする
ax1.legend([],[], frameon
# 睡眠の質の箱ひげ図
='weekday', y='sleep_quality', data=quarter_data, order=weekdays, palette=weekday_palette, ax=ax2, hue='weekday', dodge=False)
sns.boxplot(xf'Weekly Sleep Quality for {quarter}')
ax2.set_title('Weekday')
ax2.set_xlabel('Sleep Quality')
ax2.set_ylabel(=False) # レジェンドを非表示にする
ax2.legend([],[], frameon
plt.tight_layout()
plt.show()
# データの準備
= convert_sleep_data_to_jst(sleep_data)
sleep_data_jst
# 四半期ごとの統計を計算
= calculate_quarterly_sleep_stats(sleep_data_jst)
quarterly_stats
# 結果をプロット
plot_quarterly_sleep_stats(quarterly_stats)
# 曜日別の統計を計算
= calculate_weekly_sleep_stats(sleep_data_jst)
weekly_stats
# 結果を箱ひげ図でプロット
plot_weekly_sleep_stats_boxplot(sleep_data_jst)
任意の日付による、睡眠セッションの可視化
以下のセルでは、ユーザーが指定した日付の睡眠セッション分析し、睡眠ステージの推移をグラフに表示します。
In [20]:
def convert_to_jst_if_needed(column):
# タイムゾーン情報を確認し、必要に応じて変換を行う
if column.dt.tz is None:
# タイムゾーン情報がない場合、UTCとして解釈し、JSTに変換
return pd.to_datetime(column, utc=True).dt.tz_convert('Asia/Tokyo')
elif str(column.dt.tz) == 'UTC':
# タイムゾーンがUTCであれば、JSTに変換
return column.dt.tz_convert('Asia/Tokyo')
elif str(column.dt.tz) != 'Asia/Tokyo':
# タイムゾーンがJSTでない他のタイムゾーンであれば、JSTに変換
return column.dt.tz_convert('Asia/Tokyo')
else:
# 既にJSTであればそのまま返す
return column
def convert_sleep_data_to_jst(sleep_data):
# sleep_dataをコピーしてタイムゾーンを変換
= sleep_data.copy()
sleep_data_jst 'start_time'] = convert_to_jst_if_needed(sleep_data_jst['start_time'])
sleep_data_jst['end_time'] = convert_to_jst_if_needed(sleep_data_jst['end_time'])
sleep_data_jst[if 'in_bed_time' in sleep_data.columns:
'in_bed_time'] = convert_to_jst_if_needed(sleep_data_jst['in_bed_time'])
sleep_data_jst[else:
'in_bed_time'] = None
sleep_data_jst[
# 明示的に datetime64[ns, Asia/Tokyo] にキャスト
'start_time'] = sleep_data_jst['start_time'].astype('datetime64[ns, Asia/Tokyo]')
sleep_data_jst['end_time'] = sleep_data_jst['end_time'].astype('datetime64[ns, Asia/Tokyo]')
sleep_data_jst['in_bed_time'] = sleep_data_jst['in_bed_time'].astype('datetime64[ns, Asia/Tokyo]')
sleep_data_jst[
return sleep_data_jst
def create_session_data(sleep_data_jst):
# 各セッションの最終 'end_time' を取得して日付に変換
= sleep_data_jst.groupby('session_id')['end_time'].max().dt.date
session_dates = session_dates.reset_index()
session_dates ={'end_time': 'session_date'}, inplace=True)
session_dates.rename(columns
# 睡眠時間と睡眠潜時の計算
= sleep_data_jst.groupby('session_id').agg(
sleep_times =('end_time', lambda x: (x.max() - x.min()).total_seconds() / 3600),
sleep_time=('start_time', 'min')
start_time
)=True)
sleep_times.reset_index(inplace
# 睡眠潜時の計算
= sleep_data_jst.groupby('session_id').apply(
sleep_latency lambda group: calculate_sleep_latency(group[['in_bed_time', 'start_time', 'Type', 'sleep_state']]),
=False # 追加: グループ化列を適用操作から除外
include_groups='sleep_latency')
).reset_index(name
# 結合して全データを含むデータフレームを作成
= pd.merge(session_dates, sleep_times[['session_id', 'sleep_time']], on='session_id')
full_session_data = pd.merge(full_session_data, sleep_latency, on='session_id')
full_session_data
return full_session_data
def calculate_sleep_latency(group):
= group.sort_values(by='start_time')
group = group[(group['Type'] == 'Auto') & (group['sleep_state'] >= 4)]
auto_sleep_times if not auto_sleep_times.empty:
= auto_sleep_times['start_time'].iloc[0]
auto_sleep_time if pd.notna(group['in_bed_time'].iloc[0]) and group['in_bed_time'].iloc[0] <= group['start_time'].iloc[0]:
return (auto_sleep_time - group['in_bed_time'].iloc[0]).total_seconds() / 60
return np.nan
= convert_sleep_data_to_jst(sleep_data)
sleep_data_jst = create_session_data(sleep_data_jst)
full_session_data
# 日付選択ウィジェット
= DatePicker(description='Select Date', disabled=False)
date_picker
def on_prev_clicked(b):
= date_picker.value - pd.Timedelta(days=1) if date_picker.value else None
date_picker.value
def on_next_clicked(b):
= date_picker.value + pd.Timedelta(days=1) if date_picker.value else None
date_picker.value
= Button(description="Previous Day")
button_prev = Button(description="Next Day")
button_next
button_prev.on_click(on_prev_clicked)
button_next.on_click(on_next_clicked)
display(HBox([button_prev, button_next]))
display(date_picker)
# タイムゾーンを確認して適切に日付を表示する関数
def set_plot_title(ax, session_id, sleep_data_jst):
= pytz.timezone('Asia/Tokyo')
jst
= sleep_data_jst[sleep_data_jst['session_id'] == session_id]
session_data if not session_data.empty:
if session_data['start_time'].dt.tz:
= session_data['start_time'].min().astimezone(jst)
start_time_jst = session_data['end_time'].max().astimezone(jst)
end_time_jst else:
= session_data['start_time'].min().replace(tzinfo=pytz.utc)
start_time_utc = session_data['end_time'].max().replace(tzinfo=pytz.utc)
end_time_utc = start_time_utc.astimezone(jst)
start_time_jst = end_time_utc.astimezone(jst)
end_time_jst
= f"Sleep Session from {start_time_jst.strftime('%Y-%m-%d %H:%M')} to {end_time_jst.strftime('%Y-%m-%d %H:%M')}"
title
ax.set_title(title)else:
"No data available for this session")
ax.set_title(
# キャプションを追加する関数
def add_caption(ax, session_id, full_session_data):
= full_session_data[full_session_data['session_id'] == session_id].iloc[0]
record
# キャプションの初期部分
= f"睡眠時間: {record['sleep_time']:.2f} 時間\n"
caption
# sleep_latencyがNaNやマイナスでない場合のみ追加
if pd.notna(record['sleep_latency']) and record['sleep_latency'] >= 0:
+= f"睡眠潜時(布団に入ってから寝付くまでの時間): {record['sleep_latency']:.2f} 分"
caption
0.01, 0.95, caption, transform=ax.transAxes, fontsize=12, verticalalignment='top')
ax.text(
# 睡眠データをプロットする関数
def plot_sleep_data(session_id, sleep_data_jst, full_session_data):
= full_session_data[full_session_data['session_id'] == session_id]
session_info = sleep_data_jst[sleep_data_jst['session_id'] == session_id]
session_data
if not session_data.empty:
# 日時データのタイムゾーンを確認し、日本時間に設定
if session_data['start_time'].dt.tz is None:
'start_time'] = session_data['start_time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Tokyo')
session_data[if session_data['end_time'].dt.tz is None:
'end_time'] = session_data['end_time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Tokyo')
session_data[
= plt.subplots(figsize=(20, 7))
fig, ax = {3: 5, 1: 4, 4: 3, 6: 2, 5: 1, 10: 6}
stage_height = {1: '#e0ffff', 3: '#ff5252', 4: '#03a9f4', 5: '#303f9f', 6: '#ab47bc', 10:'yellow'}
stage_colors = session_data['data_source'].unique()
data_sources = len(data_sources)
source_count = 1 / source_count if source_count > 0 else 1
alpha_value
# プロットの時間を日本時間に合わせて設定
for index, row in session_data.iterrows():
= mdates.date2num(row['start_time'])
start_pos = mdates.date2num(row['end_time']) - start_pos
duration =start_pos, height=stage_height[row['sleep_state']], width=duration,
ax.bar(x=stage_colors.get(row['sleep_state'], '#FFFFFF'), edgecolor='black',
color='edge', alpha=alpha_value)
align
=timezone('Asia/Tokyo'))
ax.xaxis_date(tz=1))
ax.xaxis.set_major_locator(mdates.HourLocator(interval'%H:%M', tz=timezone('Asia/Tokyo')))
ax.xaxis.set_major_formatter(mdates.DateFormatter(0, 6)
ax.set_ylim(1, 2, 3, 4, 5, 6])
ax.set_yticks(['Deep Sleep', 'REM', 'Light Sleep', 'Awake', 'Out-of-bed', 'Mid Sleep Time'])
ax.set_yticklabels(['Time of Day')
ax.set_xlabel(
set_plot_title(ax, session_id, sleep_data_jst)
add_caption(ax, session_id, session_info)
plt.tight_layout()
plt.show()else:
print("No sleep data available for this session.")
# 日付変更時のイベントハンドラ
def on_date_change(change):
if change['new'] is not None:
= pd.to_datetime(change['new']).date()
selected_date = next((sid for sid, date in full_session_data.set_index('session_id')['session_date'].items() if date == selected_date), None)
session_id if session_id is not None:
plot_sleep_data(session_id, sleep_data_jst, full_session_data)else:
print("No sessions found for this date.")
='value') date_picker.observe(on_date_change, names